package com.yeskery.nut.core;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 基础响应发射器处理类
 * @author YESKERY
 * 2024/7/12
 */
public abstract class BaseResponseEmitterHandler implements ResponseEmitterHandler {

    /** 日志对象 */
    private static final Logger logger = Logger.getLogger(BaseResponseEmitterHandler.class.getName());

    /** 响应发射器 */
    private final ResponseBodyEmitter responseBodyEmitter;

    /** 定时器 */
    private Timer timer;

    /** 超时时间 */
    private final AtomicLong timeout;

    /** 执行线程 */
    private  volatile Thread executeThread;

    /** 数据阻塞队列 */
    private final LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<>();

    /** 结束标识 */
    private volatile boolean endFlag = false;

    /**
     * 构建基础响应发射器处理类
     * @param responseBodyEmitter 响应发射器
     */
    protected BaseResponseEmitterHandler(ResponseBodyEmitter responseBodyEmitter) {
        if (responseBodyEmitter == null) {
            throw new IllegalArgumentException("ResponseBodyEmitter Must Not Be Null.");
        }
        this.responseBodyEmitter = responseBodyEmitter;
        if (responseBodyEmitter.getTimeout() > 0) {
            this.timeout = new AtomicLong(responseBodyEmitter.getTimeout());
        } else {
            this.timeout = null;
        }
    }

    @Override
    public void send(String data) {
        send(data.getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public void send(byte[] data) {
        if (endFlag) {
            throw new IllegalStateException("ResponseEmitterHandler Already Completed.");
        }
        try {
            dataQueue.offer(data);
            if (this.timeout != null) {
                this.timeout.set(responseBodyEmitter.getTimeout());
            }
        } catch (Exception e) {
            finish();
            Consumer<Throwable> onError = responseBodyEmitter.getOnError();
            if (onError == null) {
                throw new NutException("ResponseEmitter Send Fail.", e);
            } else {
                onError.accept(e);
            }
        }
    }

    @Override
    public void completed() {
        if (endFlag) {
            throw new NutException("ResponseEmitterHandler Already Completed.");
        }
        try {
            doCompleted();
            finish();
        } catch (Exception e) {
            finish();
            Consumer<Throwable> onError = responseBodyEmitter.getOnError();
            if (onError == null) {
                throw new NutException("ResponseEmitter Complete Fail.", e);
            } else {
                onError.accept(e);
            }
        }
    }

    @Override
    public void park() {
        if (endFlag) {
            throw new IllegalStateException("ResponseEmitterHandler Already Completed.");
        }
        try {
            doSendHeaders();
        } catch (IOException e) {
            throw new NutException("ResponseEmitterHandler Send Header Fail.", e);
        }
        executeThread = Thread.currentThread();
        if (timeout != null) {
            timer = new Timer("responseEmitterHandlerAsyncTimer", true);
            timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    if (timeout.get() <= 0) {
                        timeout();
                    } else {
                        timeout.decrementAndGet();
                    }
                }
            }, 0, 1000);
        }
        try {
            while (!endFlag) {
                doSend(dataQueue.take());
            }
        } catch (InterruptedException e) {
            // Finish Flow
        } catch (IOException e) {
            throw new NutException("ResponseEmitter Send Fail.", e);
        }
    }

    /**
     * 发送响应头
     * @throws IOException IO异常
     */
    protected abstract void doSendHeaders() throws IOException;

    /**
     * 执行完成的方法
     * @throws IOException IO异常
     */
    protected abstract void doCompleted() throws IOException;

    /**
     * 执行发送的方法
     * @param bytes 输入字节
     * @throws IOException IO异常
     */
    protected abstract void doSend(byte[] bytes) throws IOException;

    /**
     * 执行发送的方法
     * @param inputStream 输入流
     * @throws IOException IO异常
     */
    protected abstract void doSend(InputStream inputStream) throws IOException;

    /**
     * 超时处理
     */
    private synchronized void timeout() {
        if (endFlag) {
            return;
        }
        Runnable onTimeout = responseBodyEmitter.getOnTimeout();
        if (onTimeout != null) {
            try {
                onTimeout.run();
            } catch (Exception e) {
                logger.log(Level.SEVERE, "ResponseEmitter Timeout Callback Execute Fail.", e);
            }
        }
        finish();
        try {
            doCompleted();
        } catch (Exception e) {
            logger.log(Level.SEVERE, "ResponseEmitter Timeout Complete Fail.", e);
        }
    }

    /**
     * 完成发送流程
     */
    private synchronized void finish() {
        if (endFlag) {
            return;
        }
        if (this.timeout != null) {
            this.timeout.set(0L);
        }
        endFlag = true;
        if (timer != null) {
            timer.cancel();
        }
        if (executeThread != null) {
            executeThread.interrupt();
        }
    }
}
